package com.hanxiaozhang.io.makereactor_4;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Channel;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2021/8/25
 * @since 1.0.0
 */
public class SelectorThreadGroup {

    private SelectorThread[] sts = null;
    private ServerSocketChannel server = null;
    private AtomicInteger xid = new AtomicInteger(0);

    /**
     * @param num 线程数
     */
    public SelectorThreadGroup(int num) {
        sts = new SelectorThread[num];
        for (int i = 0; i < num; i++) {
            sts[i] = new SelectorThread(this);
            Thread thread = new Thread(sts[i]);
            thread.start();
        }
    }


    public void bind(int port) {
        try {
            server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));

            // 注册到那个selector上？
            nextSelector(server);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 无论 是ServerSocket socket 都复用这个方法
     * 0上固定注册一个listen
     * 1和2注册客户端
     *
     * @param server
     */
    public void nextSelector(Channel server) {

        try {
            if (server instanceof ServerSocketChannel) {
                sts[0].lbq.put(server);
                sts[0].selector.wakeup();
            } else {
                // 在主线程中，取到堆中selectorThread对象
                SelectorThread st = next();
                // 1.通过队列传递数据，消息
                st.lbq.add(server);
                // 2.通过打断阻塞，让对应的线程去自己处理在打断完成的注册selector
                st.selector.wakeup();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private SelectorThread next() {
        // 轮询，容易发生倾斜
        int index = xid.incrementAndGet() % (sts.length - 1);
        return sts[index + 1];
    }

}
